1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import unit_threaded.from;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     debug {
14         import unit_threaded.testcase : TestCase;
15 
16         if (isDebugOutputEnabled)
17             TestCase.currentTest.getWriter.writeln(args);
18     }
19 }
20 
21 private shared(bool) _debugOutput = false; ///print debug msgs?
22 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
23 bool _useEscCodes;
24 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
25 
26 static this() {
27     version (Posix) {
28         import std.stdio : stdout;
29         import core.sys.posix.unistd : isatty;
30 
31         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
32     }
33 }
34 
35 package void enableDebugOutput(bool value = true) nothrow {
36     synchronized {
37         _debugOutput = value;
38     }
39 }
40 
41 package bool isDebugOutputEnabled() nothrow @trusted {
42     synchronized {
43         return _debugOutput;
44     }
45 }
46 
47 package void forceEscCodes() nothrow {
48     synchronized {
49         _forceEscCodes = true;
50     }
51 }
52 
53 interface Output {
54     void send(in string output) @safe;
55     void flush() @safe;
56 }
57 
58 private enum Colour {
59     red,
60     green,
61     yellow,
62     cancel,
63 }
64 
65 private string colour(alias C)(in string msg) {
66     return escCode(C) ~ msg ~ escCode(Colour.cancel);
67 }
68 
69 private alias green = colour!(Colour.green);
70 private alias red = colour!(Colour.red);
71 private alias yellow = colour!(Colour.yellow);
72 
73 /**
74  * Send escape code to the console
75  */
76 private string escCode(in Colour code) @safe {
77     return _useEscCodes ? _escCodes[code] : "";
78 }
79 
80 /**
81  * Writes the args in a thread-safe manner.
82  */
83 void write(T...)(Output output, auto ref T args) {
84     import std.conv : text;
85 
86     output.send(text(args));
87 }
88 
89 /**
90  * Writes the args in a thread-safe manner and appends a newline.
91  */
92 void writeln(T...)(Output output, auto ref T args) {
93     write(output, args, "\n");
94 }
95 
96 /**
97  * Writes the args in a thread-safe manner in green (POSIX only).
98  * and appends a newline.
99  */
100 void writelnGreen(T...)(Output output, auto ref T args) {
101     import std.conv : text;
102 
103     output.send(green(text(args) ~ "\n"));
104 }
105 
106 /**
107  * Writes the args in a thread-safe manner in red (POSIX only)
108  * and appends a newline.
109  */
110 void writelnRed(T...)(Output output, auto ref T args) {
111     writeRed(output, args, "\n");
112 }
113 
114 /**
115  * Writes the args in a thread-safe manner in red (POSIX only).
116  * and appends a newline.
117  */
118 void writeRed(T...)(Output output, auto ref T args) {
119     import std.conv : text;
120 
121     output.send(red(text(args)));
122 }
123 
124 /**
125  * Writes the args in a thread-safe manner in yellow (POSIX only).
126  * and appends a newline.
127  */
128 void writeYellow(T...)(Output output, auto ref T args) {
129     import std.conv : text;
130 
131     output.send(yellow(text(args)));
132 }
133 
134 /**
135  * Thread to output to stdout
136  */
137 class WriterThread : Output {
138 
139     import std.concurrency : Tid;
140 
141     /**
142      * Returns a reference to the only instance of this class.
143      */
144     static WriterThread get() @trusted {
145         import std.concurrency : initOnce;
146 
147         static __gshared WriterThread instance;
148         return initOnce!instance(new WriterThread);
149     }
150 
151     override void send(in string output) @safe {
152 
153         version (unitUnthreaded) {
154             import std.stdio : write;
155 
156             write(output);
157         } else {
158             import std.concurrency : send, thisTid;
159 
160             () @trusted{ _tid.send(output, thisTid); }();
161         }
162     }
163 
164     override void flush() @safe {
165         version (unitUnthreaded) {
166         } else {
167             import std.concurrency : send, thisTid;
168 
169             () @trusted{ _tid.send(Flush(), thisTid); }();
170         }
171     }
172 
173 private:
174 
175     this() {
176         version (unitUnthreaded) {
177         } else {
178             import std.concurrency : spawn, thisTid, receiveOnly, send;
179             import std.stdio : stdout, stderr;
180 
181             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
182             _tid.send(ThreadWait());
183             receiveOnly!ThreadStarted;
184         }
185     }
186 
187     Tid _tid;
188 }
189 
190 struct ThreadWait {
191 };
192 struct ThreadFinish {
193 };
194 struct ThreadStarted {
195 };
196 struct ThreadEnded {
197 };
198 struct Flush {
199 };
200 
201 version (Posix) {
202     enum nullFileName = "/dev/null";
203 } else {
204     enum nullFileName = "NUL";
205 }
206 
207 void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid) {
208     import std.concurrency : receive, send, OwnerTerminated, Tid;
209 
210     auto done = false;
211 
212     auto saveStdout = OUT;
213     auto saveStderr = ERR;
214 
215     void restore() {
216         saveStdout.flush();
217         OUT = saveStdout;
218         ERR = saveStderr;
219     }
220 
221     scope (failure)
222         restore;
223 
224     if (!isDebugOutputEnabled()) {
225         OUT = typeof(OUT)(nullFileName, "w");
226         ERR = typeof(ERR)(nullFileName, "w");
227     }
228 
229     void actuallyPrint(in string msg) {
230         if (msg.length)
231             saveStdout.write(msg);
232     }
233 
234     // the first thread to send output becomes the current
235     // until that thread sends a Flush message no other thread
236     // can print to stdout, so we store their outputs in the meanwhile
237     static struct ThreadOutput {
238         string currentOutput;
239         string[] outputs;
240 
241         void store(in string msg) {
242             currentOutput ~= msg;
243         }
244 
245         void flush() {
246             outputs ~= currentOutput;
247             currentOutput = "";
248         }
249     }
250 
251     ThreadOutput[Tid] outputs;
252 
253     Tid currentTid;
254 
255     while (!done) {
256         receive((string msg, Tid originTid) {
257 
258             if (currentTid == currentTid.init) {
259                 currentTid = originTid;
260 
261                 // it could be that this thread became the current thread but had output not yet printed
262                 if (originTid in outputs) {
263                     actuallyPrint(outputs[originTid].currentOutput);
264                     outputs[originTid].currentOutput = "";
265                 }
266             }
267 
268             if (currentTid == originTid)
269                 actuallyPrint(msg);
270             else {
271                 if (originTid !in outputs)
272                     outputs[originTid] = typeof(outputs[originTid]).init;
273                 outputs[originTid].store(msg);
274             }
275         }, (ThreadWait w) { tid.send(ThreadStarted()); }, (ThreadFinish f) {
276             done = true;
277         }, (Flush f, Tid originTid) {
278 
279             if (originTid in outputs)
280                 outputs[originTid].flush;
281 
282             if (currentTid != currentTid.init && currentTid != originTid)
283                 return;
284 
285             foreach (tid, ref threadOutput; outputs) {
286                 foreach (o; threadOutput.outputs)
287                     actuallyPrint(o);
288                 threadOutput.outputs = [];
289             }
290 
291             currentTid = currentTid.init;
292         }, (OwnerTerminated trm) { done = true; });
293     }
294 
295     restore;
296     tid.send(ThreadEnded());
297 }